Skip to content

feat: Prunable shard specs for streaming published segments#19571

Merged
abhishekrb19 merged 12 commits into
apache:masterfrom
abhishekrb19:streaming_partitioned_dims
Jun 12, 2026
Merged

feat: Prunable shard specs for streaming published segments#19571
abhishekrb19 merged 12 commits into
apache:masterfrom
abhishekrb19:streaming_partitioned_dims

Conversation

@abhishekrb19

@abhishekrb19 abhishekrb19 commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

Relates to #12929

Streaming-published segments currently have numbered shard specs, which aren't prunable by design. Compaction must reindex the data with range or hashed partition strategy once the data is handed off — even if the topic is partitioned, which
is easy to do with multiple supervisors. For multi-tenant datasources this means every tenant-filtered query hits every recent segment regardless of the partitioning strategy for numbered shard specs.

This PR lets streaming tasks record, per published segment, the distinct values observed for a configured set of dimensions, and declare them on a new shard spec so the broker can prune near-realtime data without waiting for compaction to
reindex handed off segments. Concurrent compaction cannot always keep up with the incoming data and additionally the compaction process itself takes time to reindex; so the benefits of range or hashed shard specs may not be fully realized for
however long it takes to reindex (30-45 minutes in our case), and doesn't help with high concurrent query workloads that are only querying more recent data.

So this PR adds a way to publish prunable shards right off the bat when they're handed off by streaming tasks, if configured. This functionality is opt-in, Kafka-only, and disabled by default.

Design

  • DimensionValueSetShardSpec (type: "dim_value_set") — extends NumberedShardSpec (behaves as a normal append segment) plus a partitionDimensionValues map (dimension → observed values). possibleInDomain() prunes a segment when the query
    constrains a declared dimension and none of its values intersect the domain; a dimension not in partitionDimensionValues is never pruned on. Set-based (not min/max), so it prunes precisely for sparse values and tolerates overlapping value
    sets across tasks/restarts.
  • Because partition values are observed at ingestion rather than hardcoded, incorrect or abruptly-changing partitioning never breaks correctness — at worst it yields non-prunable shard specs (similar to the default numbered shard) or bloated
    shard specs (this caveat can be addressed with a guard rail noted below).
  • Ingestion (SeekableStreamIndexTaskRunner) — when partitionDimensions is set, the task accumulates observed values per segment and stamps each at publish.

Configuration

New optional tuningConfig field streamingPartitionsSpec on the Kafka supervisor/task (default null), with a partitionDimensions list. When unset, behavior is unchanged. Documented in docs/ingestion/kafka-ingestion.md.

Compatibility

Backward-compatible and opt-in. But dim_value_set is a new core ShardSpec type with no defaultImpl fallback, so it is not forward-compatible: upgrade all services before enabling streamingPartitionsSpec, and note that once dim_value_set
segments are published, downgrade isn't supported until they're compacted away.

Results

Tested in a cluster, where I saw up to ~40% reduction in segment scans on the historicals for a few low to medium cardinal partition dimensions. In a follow-up, I want to extend this to also prune tasks, for reduced peon buffers and better
query performance at the task layer.

Caveats

There's currently no limit on the number of observed values stamped into a segment's partitionDimensionValues. It may make sense, in a follow-up, to add a configurable guardrail that falls back to NumberedShardSpec when the count exceeds a
threshold, so shard specs don't get bloated.

Release note

Kafka ingestion can now publish segments that the broker prunes at query time, without waiting for compaction. Set tuningConfig.streamingPartitionsSpec.partitionDimensions to a list of low-to-medium cardinality dimensions; each task records
the distinct values it observes per dimension and stamps them onto a new dim_value_set shard spec. Queries that filter on a declared dimension then skip segments whose values can't match. The feature is opt-in, Kafka-only, and disabled by
default; when unset, behavior is unchanged.

Compatibility: dim_value_set is a new core shard spec type with no fallback, so it is not forward-compatible. Upgrade all services before enabling streamingPartitionsSpec. Once dim_value_set segments are published, downgrade is unsupported
until they are compacted away or streamingPartitionsSpec is removed.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

…gestion

Adds a new core ShardSpec (stream_range) that lets Kafka streaming tasks
declare, per published segment, the distinct values observed for configured
partitionFilterDimensions. The broker uses these to prune segments whose
declared values cannot match a query filter — enabling near-realtime pruning
without waiting for compaction.

Highlights:
- StreamRangeShardSpec extends NumberedShardSpec; possibleInDomain prunes by
  per-value range intersection. Null is declared as a first-class value
  (encoded as Range.lessThan("")) so IS NULL queries are never wrongly pruned,
  and is kept distinct from the empty string.
- Opt-in via partitionFilterDimensions on the Kafka supervisor/IOConfig
  (null by default; segments otherwise get a plain NumberedShardSpec). Kafka
  only for now; backward-compatible config (old specs/constructors unchanged).
- Per-segment value accumulation at ingest time; each segment is stamped with
  only its own observed values at publish.
- Correctness guards: restart-spanning segments fall back to NumberedShardSpec
  (pre-restart rows are not re-read, so their values can't be fully observed);
  dimensions that observed a null/missing value declare null so IS NULL is not
  pruned.
- BaseAppenderatorDriver reconciles the returned SegmentsAndCommitMetadata to
  the published shard specs so handoff/publish logs report the real spec.

Tests:
- StreamRangeShardSpecTest: possibleInDomain matrix incl. null vs "" and serde.
- SeekableStreamIndexTaskRunnerTest: annotator unit tests (restart fallback,
  null handling).
- EmbeddedStreamRangeShardSpecTest: end-to-end pruning verified via the
  query/segment/time scan metric across a predicate matrix (=, !=, IN, NOT IN,
  IS NULL, IS NOT NULL, multi-value, untracked dimension, non-existent value),
  plus a no-partitioning control twin and in-memory/graceful-widening cases.
- StreamAppenderatorDriverTest: returned metadata carries the published spec.

// annotateSegmentWithPartitionFilters is a no-op (returns the segment unchanged) when partition filters are not
// configured, so it is always safe to apply here.
final java.util.function.Function<Set<DataSegment>, Set<DataSegment>> shardSpecAnnotator =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can add Function to imports


for (DataSegment segment : publishedSegmentsAndCommitMetadata.getSegments()) {
observedDimensionValuesBySegment.remove(
SegmentIdWithShardSpec.fromDataSegment(segment).toString()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also clean up restartSpannedSegments.remove here?

return s;
}
final Map<String, List<String>> snapshotFilters = new HashMap<>();
for (String dim : filterDims) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for (String dim : filterDims) {
    segObserved.computeIfPresent(dim, (k, vals) -> {
      synchronized (vals) {
        if (!vals.isEmpty()) {
          snapshotFilters.put(dim, new ArrayList<>(vals));
        }
      }
      return vals;  // Return unchanged - we're just reading
    });
  }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude recommendation for race condition

Comment thread docs/ingestion/kafka-ingestion.md Outdated
|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds.|No|100|
|`useEarliestOffset`|Boolean|If a supervisor is managing a datasource for the first time, it obtains a set of starting offsets from Kafka. This flag determines whether the supervisor retrieves the earliest or latest offsets in Kafka. Under normal circumstances, subsequent tasks start from where the previous segments ended so this flag is only used on the first run.|No|`false`|
|`idleConfig`|Object|Defines how and when the Kafka supervisor can become idle. See [Idle configuration](#idle-configuration) for more details.|No|null|
|`partitionFilterDimensions`|List of String|Dimensions to track for query-time segment pruning. See [Partition filter dimensions](#partition-filter-dimensions) for details.|No|null|

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about naming this partitionDimensions to align with the compaction config? That may make it more clear that those values should be in sync

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good call - thanks for the suggestion! I also moved this into the tuningConfig for consistency. It's nested under a streamingPartitionsSpec container to avoid naming ambiguity and so we can extend it with additional properties and/or add new types in the future if needed.

Comment thread docs/ingestion/kafka-ingestion.md Outdated
- Use only low-to-medium cardinality dimensions (for example, `tenant_id`, `region`, `environment`). High-cardinality dimensions bloat segment metadata with no pruning benefit.
- Most effective when Kafka partitions are keyed by the tracked dimension (for example, using tenant ID as the message key). Each task naturally sees a subset of values, and segments get tight filter annotations.
- Also works with multiple supervisors reading from separate topics into one datasource.
- After compaction, the `StreamRangeShardSpec` annotations are replaced by the compaction output's shard spec (hash or range partitioning), which provides its own pruning.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth mentioning that when using partitionFilterDimensions, dynamic compaction strategy should not be used

@FrankChen021 FrankChen021 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 1
P2 0
P3 1
Total 2

Reviewed 16 of 16 changed files.

I found two issues: restart-spanning segments can mix shard spec classes within one publish interval and fail publishing, and Kafka backfill specs drop partitionFilterDimensions.


This is an automated review by Codex GPT-5.5

return s;
}
final String lookupKey = SegmentIdWithShardSpec.fromDataSegment(s).toString();
if (restartSpannedSegments.contains(lookupKey)) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Mixed shard specs can fail publish after restart

Restart-spanned segments return unchanged as NumberedShardSpec, while new same-interval segments in the same publish batch can be annotated as StreamRangeShardSpec. TransactionalSegmentPublisher then runs SegmentPublisherHelper.annotateShardSpec, which rejects mixed shard-spec classes per interval, so a restarted task can fail publish/handoff. Make the fallback interval-wide, or stamp restored segments with a non-pruning stream_range spec.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this is fixed with some test coverage added.

emitTimeLagMetrics,
serverPriorityToReplicas,
boundedStreamConfig,
null

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 Backfill specs drop partitionFilterDimensions

This compatibility constructor always forwards null for partitionFilterDimensions. KafkaSupervisorSpec.createBackfillSpec still uses this overload when deriving bounded backfill specs, so a supervisor configured with partitionFilterDimensions silently creates backfill tasks without the pruning annotations. Pass the existing dimension list through for backfill specs.

@FrankChen021 FrankChen021 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 0
P2 0
P3 1
Total 1

Reviewed 19 of 19 changed files.


This is an automated review by Codex GPT-5.5

if (vals.isEmpty()) {
continue;
}
snapshot = new ArrayList<>(vals);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P3] Sort observed partition values before publishing

The list stored in partitionDimensionValues is created directly from a HashSet, so its order is unspecified. The embedded test already asserts a concrete order for these values, and equivalent published segment metadata can vary by JVM or run even when the value set is identical. Sort the snapshot deterministically, with explicit null handling, before putting it into the shard spec.

Reviewed 19 of 19 changed files.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

},
"tuningConfig": {
"type": "kafka",
"streamingPartitionsSpec": {"partitionDimensions": ["tenant"]}

@aho135 aho135 Jun 11, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Makes sense to have this in tuningConfig, and in the future we can add cardinality guardrails into streamingPartitionsSpec as well

);
Assert.assertEquals(List.of("tenant", "region"), config.getStreamingPartitionsSpec().getPartitionDimensions());
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth having a test case which covers edge case configuration for partitionDimensions (e.g. null values, empty string, integers)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth having a test case which covers edge case configuration for partitionDimensions (e.g. null values, empty string

I've added some tests in this unit test and there's some existing end-to-end coverage in the embedded ones too for null, empty cases

Re numeric types, they're currently unsupported for the range shard spec as well, given how the filters work - some details here: #19415. For now it's just documented for this new shard spec, but it would be nice to expand that functionality generally so all shard specs benefit from it; if that's not trivial, we could block them at creation time at least for this new shard spec in the future

snapshot.sort(Comparator.nullsFirst(Comparator.naturalOrder()));
snapshotFilters.put(dim, snapshot);
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a follow up, it may be useful to emit metric here for the cardinality of observedPartitionDimValuesBySegment.get(segmentId)

final TransactionalSegmentPublisher publisher,
final Committer committer,
final Collection<String> sequenceNames,
final java.util.function.Function<Set<DataSegment>, Set<DataSegment>> segmentAnnotateFunction

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java.util.function.Function can also be imported since it's used twice

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I tried using Guava's Function here (which is used in the class below), but it doesn't seem to fit, hence the inline java.util.function.Function

@aho135 aho135 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few minor comments, but otherwise LGTM. Thanks @abhishekrb19!

@FrankChen021 FrankChen021 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 5 of 20 changed files (6 files total inspected) for this reply-needed follow-up.

Confirmed the two existing review follow-ups are addressed in current code: observed partition values are now sorted deterministically with explicit null handling, and restart-spanned segments now use empty-filter StreamRangeShardSpec behavior with relevant task-runner and publisher-helper test coverage.

No inline reply looks necessary.


This is an automated review by Codex GPT-5.5

@abhishekrb19

Copy link
Copy Markdown
Contributor Author

Thanks for the reviews @aho135 @FrankChen021! I also went ahead and rename the shard spec type from stream_range to dim_value_set as it better aligns with what it does.

@abhishekrb19 abhishekrb19 merged commit ce74072 into apache:master Jun 12, 2026
38 checks passed
@abhishekrb19 abhishekrb19 deleted the streaming_partitioned_dims branch June 12, 2026 20:56
@github-actions github-actions Bot added this to the 38.0.0 milestone Jun 12, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants